Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.
In this notebook, we'll train two classifiers to predict survivors in the Titanic dataset. We'll use this classic machine learning problem as a brief introduction to using Apache Spark local mode in a notebook.
In [1]:
import pyspark
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
First we create a SparkContext, the main object in the Spark API. This call may take a few seconds to return as it fires up a JVM under the covers.
In [2]:
conf = pyspark.SparkConf()
conf.setAppName("Pyspark Test")
sc = pyspark.SparkContext(conf=conf)
We point the context at a CSV file on disk. The result is a RDD, not the content of the file. This is a Spark transformation.
In [3]:
raw_rdd = sc.textFile("/opt/datasets/titanic.csv")
We query RDD for the number of lines in the file. The call here causes the file to be read and the result computed. This is a Spark action.
In [4]:
raw_rdd.count()
Out[4]:
We query for the first five rows of the RDD. Even though the data is small, we shouldn't get into the habit of pulling the entire dataset into the notebook. Many datasets that we might want to work with using Spark will be much too large to fit in memory of a single machine.
In [5]:
raw_rdd.take(5)
Out[5]:
We see a header row followed by a set of data rows. We filter out the header to define a new RDD containing only the data rows.
In [6]:
header = raw_rdd.first()
data_rdd = raw_rdd.filter(lambda line: line != header)
We take a random sample of the data rows to better understand the possible values.
In [7]:
data_rdd.takeSample(False, 5, 0)
Out[7]:
We see that the first value in every row is a passenger number. The next three values are the passenger attributes we might use to predict passenger survival: ticket class, age group, and gender. The final value is the survival ground truth.
Now we define a function to turn the passenger attributions into structured LabeledPoint objects.
In [8]:
def row_to_labeled_point(line):
'''
Builds a LabelPoint consisting of:
survival (truth): 0=no, 1=yes
ticket class: 0=1st class, 1=2nd class, 2=3rd class
age group: 0=child, 1=adults
gender: 0=man, 1=woman
'''
passenger_id, klass, age, sex, survived = [segs.strip('"') for segs in line.split(',')]
klass = int(klass[0]) - 1
if (age not in ['adults', 'child'] or
sex not in ['man', 'women'] or
survived not in ['yes', 'no']):
raise RuntimeError('unknown value')
features = [
klass,
(1 if age == 'adults' else 0),
(1 if sex == 'women' else 0)
]
return LabeledPoint(1 if survived == 'yes' else 0, features)
We apply the function to all rows.
In [10]:
labeled_points_rdd = data_rdd.map(row_to_labeled_point)
We take a random sample of the resulting points to inspect them.
In [11]:
labeled_points_rdd.takeSample(False, 5, 0)
Out[11]:
We split the transformed data into a training (70%) and test set (30%), and print the total number of items in each segment.
In [12]:
training_rdd, test_rdd = labeled_points_rdd.randomSplit([0.7, 0.3], seed = 0)
In [13]:
training_count = training_rdd.count()
test_count = test_rdd.count()
In [14]:
training_count, test_count
Out[14]:
Now we train a DecisionTree model. We specify that we're training a boolean classifier (i.e., there are two outcomes). We also specify that all of our features are categorical and the number of possible categories for each.
In [15]:
model = DecisionTree.trainClassifier(training_rdd,
numClasses=2,
categoricalFeaturesInfo={
0: 3,
1: 2,
2: 2
})
We now apply the trained model to the feature values in the test set to get the list of predicted outcomines.
In [15]:
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))
We bundle our predictions with the ground truth outcome for each passenger in the test set.
In [16]:
truth_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)
Now we compute the test error (% predicted survival outcomes == actual outcomes) and display the decision tree for good measure.
In [17]:
accuracy = truth_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)
print(model.toDebugString())
For a simple comparison, we also train and test a LogisticRegressionWithSGD model.
In [18]:
model = LogisticRegressionWithSGD.train(training_rdd)
In [19]:
predictions_rdd = model.predict(test_rdd.map(lambda x: x.features))
In [20]:
labels_and_predictions_rdd = test_rdd.map(lambda lp: lp.label).zip(predictions_rdd)
In [21]:
accuracy = labels_and_predictions_rdd.filter(lambda v_p: v_p[0] == v_p[1]).count() / float(test_count)
print('Accuracy =', accuracy)